use std::collections::VecDeque;

use arrow::legacy::time_zone::Tz;
use arrow::temporal_conversions::{
    timestamp_ms_to_datetime, timestamp_ns_to_datetime, timestamp_us_to_datetime,
};
use arrow::trusted_len::TrustedLen;
use chrono::NaiveDateTime;
#[cfg(feature = "timezones")]
use chrono::TimeZone as _;
use now::DateTimeNow;
use polars_core::POOL;
use polars_core::prelude::*;
use polars_core::utils::_split_offsets;
use polars_core::utils::flatten::flatten_par;
use rayon::prelude::*;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
use strum_macros::IntoStaticStr;

use crate::prelude::*;

#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, IntoStaticStr)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
#[strum(serialize_all = "snake_case")]
pub enum ClosedWindow {
    Left,
    Right,
    Both,
    None,
}

#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, IntoStaticStr)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
#[strum(serialize_all = "snake_case")]
pub enum Label {
    Left,
    Right,
    DataPoint,
}

#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, IntoStaticStr)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
#[strum(serialize_all = "snake_case")]
#[derive(Default)]
pub enum StartBy {
    #[default]
    WindowBound,
    DataPoint,
    /// only useful if periods are weekly
    Monday,
    Tuesday,
    Wednesday,
    Thursday,
    Friday,
    Saturday,
    Sunday,
}

impl StartBy {
    pub fn weekday(&self) -> Option<u32> {
        match self {
            StartBy::Monday => Some(0),
            StartBy::Tuesday => Some(1),
            StartBy::Wednesday => Some(2),
            StartBy::Thursday => Some(3),
            StartBy::Friday => Some(4),
            StartBy::Saturday => Some(5),
            StartBy::Sunday => Some(6),
            _ => None,
        }
    }
}

#[allow(clippy::too_many_arguments)]
fn update_groups_and_bounds(
    bounds_iter: BoundsIter<'_>,
    mut start: usize,
    time: &[i64],
    closed_window: ClosedWindow,
    include_lower_bound: bool,
    include_upper_bound: bool,
    lower_bound: &mut Vec<i64>,
    upper_bound: &mut Vec<i64>,
    groups: &mut Vec<[IdxSize; 2]>,
) {
    let mut iter = bounds_iter.into_iter();
    let mut stride = 0;

    'bounds: while let Some(bi) = iter.nth(stride) {
        let mut has_member = false;
        // find starting point of window
        for &t in &time[start..time.len().saturating_sub(1)] {
            // the window is behind the time values.
            if bi.is_future(t, closed_window) {
                stride = iter.get_stride(t);
                continue 'bounds;
            }
            if bi.is_member_entry(t, closed_window) {
                has_member = true;
                break;
            }
            // element drops out of the window
            start += 1;
        }

        // update stride so we can fast-forward in case of sparse data
        stride = if has_member {
            0
        } else {
            debug_assert!(start < time.len());
            iter.get_stride(time[start])
        };

        // find members of this window
        let mut end = start;

        // last value isn't always added
        if end == time.len() - 1 {
            let t = time[end];
            if bi.is_member(t, closed_window) {
                if include_lower_bound {
                    lower_bound.push(bi.start);
                }
                if include_upper_bound {
                    upper_bound.push(bi.stop);
                }
                groups.push([end as IdxSize, 1])
            }
            continue;
        }
        for &t in &time[end..] {
            if !bi.is_member_exit(t, closed_window) {
                break;
            }
            end += 1;
        }
        let len = end - start;

        if include_lower_bound {
            lower_bound.push(bi.start);
        }
        if include_upper_bound {
            upper_bound.push(bi.stop);
        }
        groups.push([start as IdxSize, len as IdxSize])
    }
}

/// Window boundaries are created based on the given `Window`, which is defined by:
/// - every
/// - period
/// - offset
///
/// And every window boundary we search for the values that fit that window by the given
/// `ClosedWindow`. The groups are return as `GroupTuples` together with the lower bound and upper
/// bound timestamps. These timestamps indicate the start (lower) and end (upper) of the window of
/// that group.
///
/// If `include_boundaries` is `false` those `lower` and `upper` vectors will be empty.
#[allow(clippy::too_many_arguments)]
pub fn group_by_windows(
    window: Window,
    time: &[i64],
    closed_window: ClosedWindow,
    tu: TimeUnit,
    tz: &Option<TimeZone>,
    include_lower_bound: bool,
    include_upper_bound: bool,
    start_by: StartBy,
) -> PolarsResult<(GroupsSlice, Vec<i64>, Vec<i64>)> {
    let start = time[0];
    // the boundary we define here is not yet correct. It doesn't take 'period' into account
    // and it doesn't have the proper starting point. This boundary is used as a proxy to find
    // the proper 'boundary' in  'window.get_overlapping_bounds_iter'.
    let boundary = if time.len() > 1 {
        // +1 because left or closed boundary could match the next window if it is on the boundary
        let stop = time[time.len() - 1] + 1;
        Bounds::new_checked(start, stop)
    } else {
        let stop = start + 1;
        Bounds::new_checked(start, stop)
    };

    let size = {
        match tu {
            TimeUnit::Nanoseconds => window.estimate_overlapping_bounds_ns(boundary),
            TimeUnit::Microseconds => window.estimate_overlapping_bounds_us(boundary),
            TimeUnit::Milliseconds => window.estimate_overlapping_bounds_ms(boundary),
        }
    };
    let size_lower = if include_lower_bound { size } else { 0 };
    let size_upper = if include_upper_bound { size } else { 0 };
    let mut lower_bound = Vec::with_capacity(size_lower);
    let mut upper_bound = Vec::with_capacity(size_upper);

    let mut groups = Vec::with_capacity(size);
    let start_offset = 0;

    match tz {
        #[cfg(feature = "timezones")]
        Some(tz) => {
            update_groups_and_bounds(
                window.get_overlapping_bounds_iter(
                    boundary,
                    closed_window,
                    tu,
                    tz.parse::<Tz>().ok().as_ref(),
                    start_by,
                )?,
                start_offset,
                time,
                closed_window,
                include_lower_bound,
                include_upper_bound,
                &mut lower_bound,
                &mut upper_bound,
                &mut groups,
            );
        },
        _ => {
            update_groups_and_bounds(
                window.get_overlapping_bounds_iter(boundary, closed_window, tu, None, start_by)?,
                start_offset,
                time,
                closed_window,
                include_lower_bound,
                include_upper_bound,
                &mut lower_bound,
                &mut upper_bound,
                &mut groups,
            );
        },
    };

    Ok((groups, lower_bound, upper_bound))
}

// t is right at the end of the window
// ------t---
// [------]
#[inline]
#[allow(clippy::too_many_arguments)]
pub(crate) fn group_by_values_iter_lookbehind(
    period: Duration,
    offset: Duration,
    time: &[i64],
    closed_window: ClosedWindow,
    tu: TimeUnit,
    tz: Option<Tz>,
    start_offset: usize,
    upper_bound: Option<usize>,
) -> PolarsResult<impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_> {
    debug_assert!(offset.duration_ns() == period.duration_ns());
    debug_assert!(offset.negative);
    let add = match tu {
        TimeUnit::Nanoseconds => Duration::add_ns,
        TimeUnit::Microseconds => Duration::add_us,
        TimeUnit::Milliseconds => Duration::add_ms,
    };

    let upper_bound = upper_bound.unwrap_or(time.len());
    // Use binary search to find the initial start as that is behind.
    let mut start = if let Some(&t) = time.get(start_offset) {
        let lower = add(&offset, t, tz.as_ref())?;
        // We have `period == -offset`, so `t + offset + period` is equal to `t`,
        // and `upper` is trivially equal to `t` itself. Using the trivial calculation,
        // instead of `upper = lower + period`, avoids issues around
        // `t - 1mo + 1mo` not round-tripping.
        let upper = t;
        let b = Bounds::new(lower, upper);
        let slice = &time[..start_offset];
        slice.partition_point(|v| !b.is_member(*v, closed_window))
    } else {
        0
    };
    let mut end = start;
    let mut last = time[start_offset];
    Ok(time[start_offset..upper_bound]
        .iter()
        .enumerate()
        .map(move |(mut i, t)| {
            // Fast path for duplicates.
            if *t == last && i > 0 {
                let len = end - start;
                let offset = start as IdxSize;
                return Ok((offset, len as IdxSize));
            }
            last = *t;
            i += start_offset;

            let lower = add(&offset, *t, tz.as_ref())?;
            let upper = *t;

            let b = Bounds::new(lower, upper);

            for &t in unsafe { time.get_unchecked(start..i) } {
                if b.is_member_entry(t, closed_window) {
                    break;
                }
                start += 1;
            }

            // faster path, check if `i` is member.
            if b.is_member_exit(*t, closed_window) {
                end = i;
            } else {
                end = std::cmp::max(end, start);
            }
            // we still must loop to consume duplicates
            for &t in unsafe { time.get_unchecked(end..) } {
                if !b.is_member_exit(t, closed_window) {
                    break;
                }
                end += 1;
            }

            let len = end - start;
            let offset = start as IdxSize;

            Ok((offset, len as IdxSize))
        }))
}

// this one is correct for all lookbehind/lookaheads, but is slower
// window is completely behind t and t itself is not a member
// ---------------t---
//  [---]
pub(crate) fn group_by_values_iter_window_behind_t(
    period: Duration,
    offset: Duration,
    time: &[i64],
    closed_window: ClosedWindow,
    tu: TimeUnit,
    tz: Option<Tz>,
) -> impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_ {
    let add = match tu {
        TimeUnit::Nanoseconds => Duration::add_ns,
        TimeUnit::Microseconds => Duration::add_us,
        TimeUnit::Milliseconds => Duration::add_ms,
    };

    let mut start = 0;
    let mut end = start;
    let mut last = time[0];
    let mut started = false;
    time.iter().map(move |lower| {
        // Fast path for duplicates.
        if *lower == last && started {
            let len = end - start;
            let offset = start as IdxSize;
            return Ok((offset, len as IdxSize));
        }
        last = *lower;
        started = true;
        let lower = add(&offset, *lower, tz.as_ref())?;
        let upper = add(&period, lower, tz.as_ref())?;

        let b = Bounds::new(lower, upper);
        if b.is_future(time[0], closed_window) {
            Ok((0, 0))
        } else {
            for &t in &time[start..] {
                if b.is_member_entry(t, closed_window) {
                    break;
                }
                start += 1;
            }

            end = std::cmp::max(start, end);
            for &t in &time[end..] {
                if !b.is_member_exit(t, closed_window) {
                    break;
                }
                end += 1;
            }

            let len = end - start;
            let offset = start as IdxSize;

            Ok((offset, len as IdxSize))
        }
    })
}

// window is with -1 periods of t
// ----t---
//  [---]
pub(crate) fn group_by_values_iter_partial_lookbehind(
    period: Duration,
    offset: Duration,
    time: &[i64],
    closed_window: ClosedWindow,
    tu: TimeUnit,
    tz: Option<Tz>,
) -> impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_ {
    let add = match tu {
        TimeUnit::Nanoseconds => Duration::add_ns,
        TimeUnit::Microseconds => Duration::add_us,
        TimeUnit::Milliseconds => Duration::add_ms,
    };

    let mut start = 0;
    let mut end = start;
    let mut last = time[0];
    time.iter().enumerate().map(move |(i, lower)| {
        // Fast path for duplicates.
        if *lower == last && i > 0 {
            let len = end - start;
            let offset = start as IdxSize;
            return Ok((offset, len as IdxSize));
        }
        last = *lower;

        let lower = add(&offset, *lower, tz.as_ref())?;
        let upper = add(&period, lower, tz.as_ref())?;

        let b = Bounds::new(lower, upper);

        for &t in &time[start..] {
            if b.is_member_entry(t, closed_window) || start == i {
                break;
            }
            start += 1;
        }

        end = std::cmp::max(start, end);
        for &t in &time[end..] {
            if !b.is_member_exit(t, closed_window) {
                break;
            }
            end += 1;
        }

        let len = end - start;
        let offset = start as IdxSize;

        Ok((offset, len as IdxSize))
    })
}

#[allow(clippy::too_many_arguments)]
// window is completely ahead of t and t itself is not a member
// --t-----------
//        [---]
pub(crate) fn group_by_values_iter_lookahead(
    period: Duration,
    offset: Duration,
    time: &[i64],
    closed_window: ClosedWindow,
    tu: TimeUnit,
    tz: Option<Tz>,
    start_offset: usize,
    upper_bound: Option<usize>,
) -> impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_ {
    let upper_bound = upper_bound.unwrap_or(time.len());

    let add = match tu {
        TimeUnit::Nanoseconds => Duration::add_ns,
        TimeUnit::Microseconds => Duration::add_us,
        TimeUnit::Milliseconds => Duration::add_ms,
    };
    let mut start = start_offset;
    let mut end = start;

    let mut last = time[start_offset];
    let mut started = false;
    time[start_offset..upper_bound].iter().map(move |lower| {
        // Fast path for duplicates.
        if *lower == last && started {
            let len = end - start;
            let offset = start as IdxSize;
            return Ok((offset, len as IdxSize));
        }
        started = true;
        last = *lower;

        let lower = add(&offset, *lower, tz.as_ref())?;
        let upper = add(&period, lower, tz.as_ref())?;

        let b = Bounds::new(lower, upper);

        for &t in &time[start..] {
            if b.is_member_entry(t, closed_window) {
                break;
            }
            start += 1;
        }

        end = std::cmp::max(start, end);
        for &t in &time[end..] {
            if !b.is_member_exit(t, closed_window) {
                break;
            }
            end += 1;
        }

        let len = end - start;
        let offset = start as IdxSize;

        Ok((offset, len as IdxSize))
    })
}

#[cfg(feature = "rolling_window_by")]
#[inline]
pub(crate) fn group_by_values_iter(
    period: Duration,
    time: &[i64],
    closed_window: ClosedWindow,
    tu: TimeUnit,
    tz: Option<Tz>,
) -> PolarsResult<impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_> {
    let mut offset = period;
    offset.negative = true;
    // t is at the right endpoint of the window
    group_by_values_iter_lookbehind(period, offset, time, closed_window, tu, tz, 0, None)
}

/// Checks if the boundary elements don't split on duplicates.
/// If they do we remove them
fn prune_splits_on_duplicates(time: &[i64], thread_offsets: &mut Vec<(usize, usize)>) {
    let is_valid = |window: &[(usize, usize)]| -> bool {
        debug_assert_eq!(window.len(), 2);
        let left_block_end = window[0].0 + window[0].1.saturating_sub(1);
        let right_block_start = window[1].0;
        time[left_block_end] != time[right_block_start]
    };

    if time.is_empty() || thread_offsets.len() <= 1 || thread_offsets.windows(2).all(is_valid) {
        return;
    }

    let mut new = vec![];
    for window in thread_offsets.windows(2) {
        let this_block_is_valid = is_valid(window);
        if this_block_is_valid {
            // Only push left block
            new.push(window[0])
        }
    }
    // Check last block
    if thread_offsets.len().is_multiple_of(2) {
        let window = &thread_offsets[thread_offsets.len() - 2..];
        if is_valid(window) {
            new.push(thread_offsets[thread_offsets.len() - 1])
        }
    }
    // We pruned invalid blocks, now we must correct the lengths.
    if new.len() <= 1 {
        new = vec![(0, time.len())];
    } else {
        let mut previous_start = time.len();
        for window in new.iter_mut().rev() {
            window.1 = previous_start - window.0;
            previous_start = window.0;
        }
        new[0].0 = 0;
        new[0].1 = new[1].0;
        debug_assert_eq!(new.iter().map(|w| w.1).sum::<usize>(), time.len());
        // Call again to check.
        prune_splits_on_duplicates(time, &mut new)
    }
    std::mem::swap(thread_offsets, &mut new);
}

#[allow(clippy::too_many_arguments)]
fn group_by_values_iter_lookbehind_collected(
    period: Duration,
    offset: Duration,
    time: &[i64],
    closed_window: ClosedWindow,
    tu: TimeUnit,
    tz: Option<Tz>,
    start_offset: usize,
    upper_bound: Option<usize>,
) -> PolarsResult<Vec<[IdxSize; 2]>> {
    let iter = group_by_values_iter_lookbehind(
        period,
        offset,
        time,
        closed_window,
        tu,
        tz,
        start_offset,
        upper_bound,
    )?;
    iter.map(|result| result.map(|(offset, len)| [offset, len]))
        .collect::<PolarsResult<Vec<_>>>()
}

#[allow(clippy::too_many_arguments)]
pub(crate) fn group_by_values_iter_lookahead_collected(
    period: Duration,
    offset: Duration,
    time: &[i64],
    closed_window: ClosedWindow,
    tu: TimeUnit,
    tz: Option<Tz>,
    start_offset: usize,
    upper_bound: Option<usize>,
) -> PolarsResult<Vec<[IdxSize; 2]>> {
    let iter = group_by_values_iter_lookahead(
        period,
        offset,
        time,
        closed_window,
        tu,
        tz,
        start_offset,
        upper_bound,
    );
    iter.map(|result| result.map(|(offset, len)| [offset as IdxSize, len]))
        .collect::<PolarsResult<Vec<_>>>()
}

/// Different from `group_by_windows`, where define window buckets and search which values fit that
/// pre-defined bucket.
///
/// This function defines every window based on the:
///     - timestamp (lower bound)
///     - timestamp + period (upper bound)
/// where timestamps are the individual values in the array `time`
pub fn group_by_values(
    period: Duration,
    offset: Duration,
    time: &[i64],
    closed_window: ClosedWindow,
    tu: TimeUnit,
    tz: Option<Tz>,
) -> PolarsResult<GroupsSlice> {
    if time.is_empty() {
        return Ok(GroupsSlice::from(vec![]));
    }

    let mut thread_offsets = _split_offsets(time.len(), POOL.current_num_threads());
    // there are duplicates in the splits, so we opt for a single partition
    prune_splits_on_duplicates(time, &mut thread_offsets);

    // If we start from within parallel work we will do this single threaded.
    let run_parallel = !POOL.current_thread_has_pending_tasks().unwrap_or(false);

    // we have a (partial) lookbehind window
    if offset.negative && !offset.is_zero() {
        // lookbehind
        if offset.duration_ns() == period.duration_ns() {
            // t is right at the end of the window
            // ------t---
            // [------]
            if !run_parallel {
                let vecs = group_by_values_iter_lookbehind_collected(
                    period,
                    offset,
                    time,
                    closed_window,
                    tu,
                    tz,
                    0,
                    None,
                )?;
                return Ok(GroupsSlice::from(vecs));
            }

            POOL.install(|| {
                let vals = thread_offsets
                    .par_iter()
                    .copied()
                    .map(|(base_offset, len)| {
                        let upper_bound = base_offset + len;
                        group_by_values_iter_lookbehind_collected(
                            period,
                            offset,
                            time,
                            closed_window,
                            tu,
                            tz,
                            base_offset,
                            Some(upper_bound),
                        )
                    })
                    .collect::<PolarsResult<Vec<_>>>()?;
                Ok(flatten_par(&vals))
            })
        } else if ((offset.duration_ns() >= period.duration_ns())
            && matches!(closed_window, ClosedWindow::Left | ClosedWindow::None))
            || ((offset.duration_ns() > period.duration_ns())
                && matches!(closed_window, ClosedWindow::Right | ClosedWindow::Both))
        {
            // window is completely behind t and t itself is not a member
            // ---------------t---
            //  [---]
            let iter =
                group_by_values_iter_window_behind_t(period, offset, time, closed_window, tu, tz);
            iter.map(|result| result.map(|(offset, len)| [offset, len]))
                .collect::<PolarsResult<_>>()
        }
        // partial lookbehind
        // this one is still single threaded
        // can make it parallel later, its a bit more complicated because the boundaries are unknown
        // window is with -1 periods of t
        // ----t---
        //  [---]
        else {
            let iter = group_by_values_iter_partial_lookbehind(
                period,
                offset,
                time,
                closed_window,
                tu,
                tz,
            );
            iter.map(|result| result.map(|(offset, len)| [offset, len]))
                .collect::<PolarsResult<_>>()
        }
    } else if !offset.is_zero()
        || closed_window == ClosedWindow::Right
        || closed_window == ClosedWindow::None
    {
        // window is completely ahead of t and t itself is not a member
        // --t-----------
        //        [---]

        if !run_parallel {
            let vecs = group_by_values_iter_lookahead_collected(
                period,
                offset,
                time,
                closed_window,
                tu,
                tz,
                0,
                None,
            )?;
            return Ok(GroupsSlice::from(vecs));
        }

        POOL.install(|| {
            let vals = thread_offsets
                .par_iter()
                .copied()
                .map(|(base_offset, len)| {
                    let lower_bound = base_offset;
                    let upper_bound = base_offset + len;
                    group_by_values_iter_lookahead_collected(
                        period,
                        offset,
                        time,
                        closed_window,
                        tu,
                        tz,
                        lower_bound,
                        Some(upper_bound),
                    )
                })
                .collect::<PolarsResult<Vec<_>>>()?;
            Ok(flatten_par(&vals))
        })
    } else {
        if !run_parallel {
            let vecs = group_by_values_iter_lookahead_collected(
                period,
                offset,
                time,
                closed_window,
                tu,
                tz,
                0,
                None,
            )?;
            return Ok(GroupsSlice::from(vecs));
        }

        // Offset is 0 and window is closed on the left:
        // it must be that the window starts at t and t is a member
        // --t-----------
        //  [---]
        POOL.install(|| {
            let vals = thread_offsets
                .par_iter()
                .copied()
                .map(|(base_offset, len)| {
                    let lower_bound = base_offset;
                    let upper_bound = base_offset + len;
                    group_by_values_iter_lookahead_collected(
                        period,
                        offset,
                        time,
                        closed_window,
                        tu,
                        tz,
                        lower_bound,
                        Some(upper_bound),
                    )
                })
                .collect::<PolarsResult<Vec<_>>>()?;
            Ok(flatten_par(&vals))
        })
    }
}

pub struct RollingWindower {
    period: Duration,
    offset: Duration,
    closed: ClosedWindow,

    add: fn(&Duration, i64, Option<&Tz>) -> PolarsResult<i64>,
    tz: Option<Tz>,

    start: IdxSize,
    end: IdxSize,
    length: IdxSize,

    active: VecDeque<ActiveWindow>,
}

struct ActiveWindow {
    start: i64,
    end: i64,
}

impl ActiveWindow {
    #[inline(always)]
    fn above_lower_bound(&self, t: i64, closed: ClosedWindow) -> bool {
        (t > self.start)
            | (matches!(closed, ClosedWindow::Left | ClosedWindow::Both) & (t == self.start))
    }

    #[inline(always)]
    fn below_upper_bound(&self, t: i64, closed: ClosedWindow) -> bool {
        (t < self.end)
            | (matches!(closed, ClosedWindow::Right | ClosedWindow::Both) & (t == self.end))
    }
}

fn skip_in_2d_list(l: &[&[i64]], mut n: usize) -> (usize, usize) {
    let mut y = 0;
    while y < l.len() && (n >= l[y].len() || l[y].is_empty()) {
        n -= l[y].len();
        y += 1;
    }
    assert!(n == 0 || y < l.len());
    (n, y)
}
fn increment_2d(x: &mut usize, y: &mut usize, l: &[&[i64]]) {
    *x += 1;
    while *y < l.len() && *x == l[*y].len() {
        *y += 1;
        *x = 0;
    }
}

impl RollingWindower {
    pub fn new(
        period: Duration,
        offset: Duration,
        closed: ClosedWindow,
        tu: TimeUnit,
        tz: Option<Tz>,
    ) -> Self {
        Self {
            period,
            offset,
            closed,

            add: match tu {
                TimeUnit::Nanoseconds => Duration::add_ns,
                TimeUnit::Microseconds => Duration::add_us,
                TimeUnit::Milliseconds => Duration::add_ms,
            },
            tz,

            start: 0,
            end: 0,
            length: 0,

            active: Default::default(),
        }
    }

    /// Insert new values into the windower.
    ///
    /// This should be given all the old values that were not processed yet.
    pub fn insert(
        &mut self,
        time: &[&[i64]],
        windows: &mut Vec<[IdxSize; 2]>,
    ) -> PolarsResult<IdxSize> {
        let (mut i_x, mut i_y) = skip_in_2d_list(time, (self.length - self.start) as usize);
        let (mut s_x, mut s_y) = skip_in_2d_list(time, 0); // skip over empty lists
        let (mut e_x, mut e_y) = skip_in_2d_list(time, (self.end - self.start) as usize);

        let time_start = self.start;
        let mut i = self.length;
        while i_y < time.len() {
            let t = time[i_y][i_x];
            let window_start = (self.add)(&self.offset, t, self.tz.as_ref())?;
            // For datetime arithmetic, it does *NOT* hold 0 + a - a == 0. Therefore, we make sure
            // that if `offset` and `period` are inverses we keep the `t`.
            let window_end = if self.offset == -self.period {
                t
            } else {
                (self.add)(&self.period, window_start, self.tz.as_ref())?
            };

            self.active.push_back(ActiveWindow {
                start: window_start,
                end: window_end,
            });

            while let Some(w) = self.active.front() {
                if w.below_upper_bound(t, self.closed) {
                    break;
                }

                let w = self.active.pop_front().unwrap();
                while self.start < i && !w.above_lower_bound(time[s_y][s_x], self.closed) {
                    increment_2d(&mut s_x, &mut s_y, time);
                    self.start += 1;
                }
                while self.end < i && w.below_upper_bound(time[e_y][e_x], self.closed) {
                    increment_2d(&mut e_x, &mut e_y, time);
                    self.end += 1;
                }
                windows.push([self.start, self.end - self.start]);
            }

            increment_2d(&mut i_x, &mut i_y, time);
            i += 1;
        }

        self.length = i;
        Ok(self.start - time_start)
    }

    /// Process all remaining items and signal that no more items are coming.
    pub fn finalize(&mut self, time: &[&[i64]], windows: &mut Vec<[IdxSize; 2]>) {
        assert_eq!(
            time.iter().map(|t| t.len()).sum::<usize>() as IdxSize,
            self.length - self.start
        );

        let (mut s_x, mut s_y) = skip_in_2d_list(time, 0);
        let (mut e_x, mut e_y) = skip_in_2d_list(time, (self.end - self.start) as usize);

        windows.extend(self.active.drain(..).map(|w| {
            while self.start < self.length && !w.above_lower_bound(time[s_y][s_x], self.closed) {
                increment_2d(&mut s_x, &mut s_y, time);
                self.start += 1;
            }
            while self.end < self.length && w.below_upper_bound(time[e_y][e_x], self.closed) {
                increment_2d(&mut e_x, &mut e_y, time);
                self.end += 1;
            }
            [self.start, self.end - self.start]
        }));

        self.start = 0;
        self.end = 0;
        self.length = 0;
    }

    pub fn reset(&mut self) {
        self.active.clear();
        self.start = 0;
        self.end = 0;
        self.length = 0;
    }
}

#[derive(Debug)]
struct ActiveDynWindow {
    start: IdxSize,
    lower_bound: i64,
    upper_bound: i64,
}

#[inline(always)]
fn is_above_lower_bound(t: i64, lb: i64, closed: ClosedWindow) -> bool {
    (t > lb) | (matches!(closed, ClosedWindow::Left | ClosedWindow::Both) & (t == lb))
}
#[inline(always)]
fn is_below_upper_bound(t: i64, ub: i64, closed: ClosedWindow) -> bool {
    (t < ub) | (matches!(closed, ClosedWindow::Right | ClosedWindow::Both) & (t == ub))
}

pub struct GroupByDynamicWindower {
    period: Duration,
    offset: Duration,
    every: Duration,
    closed: ClosedWindow,

    start_by: StartBy,

    add: fn(&Duration, i64, Option<&Tz>) -> PolarsResult<i64>,
    // Not-to-exceed duration (upper limit).
    nte: fn(&Duration) -> i64,
    tu: TimeUnit,
    tz: Option<Tz>,

    include_lower_bound: bool,
    include_upper_bound: bool,

    num_seen: IdxSize,
    next_lower_bound: i64,
    active: VecDeque<ActiveDynWindow>,
}

impl GroupByDynamicWindower {
    #[expect(clippy::too_many_arguments)]
    pub fn new(
        period: Duration,
        offset: Duration,
        every: Duration,
        start_by: StartBy,
        closed: ClosedWindow,
        tu: TimeUnit,
        tz: Option<Tz>,
        include_lower_bound: bool,
        include_upper_bound: bool,
    ) -> Self {
        Self {
            period,
            offset,
            every,
            closed,

            start_by,

            add: match tu {
                TimeUnit::Nanoseconds => Duration::add_ns,
                TimeUnit::Microseconds => Duration::add_us,
                TimeUnit::Milliseconds => Duration::add_ms,
            },
            nte: match tu {
                TimeUnit::Nanoseconds => Duration::nte_duration_ns,
                TimeUnit::Microseconds => Duration::nte_duration_us,
                TimeUnit::Milliseconds => Duration::nte_duration_ms,
            },
            tu,
            tz,

            include_lower_bound,
            include_upper_bound,

            num_seen: 0,
            next_lower_bound: 0,
            active: Default::default(),
        }
    }

    pub fn find_first_window_around(
        &self,
        mut lower_bound: i64,
        target: i64,
    ) -> PolarsResult<Result<(i64, i64), i64>> {
        let mut upper_bound = (self.add)(&self.period, lower_bound, self.tz.as_ref())?;
        while !is_below_upper_bound(target, upper_bound, self.closed) {
            let gap = target - lower_bound;
            let nth = match self.tu {
                TimeUnit::Nanoseconds
                    if gap > self.every.nte_duration_ns() + self.period.nte_duration_ns() =>
                {
                    ((gap - self.period.nte_duration_ns()) as usize)
                        / (self.every.nte_duration_ns() as usize)
                },
                TimeUnit::Microseconds
                    if gap > self.every.nte_duration_us() + self.period.nte_duration_us() =>
                {
                    ((gap - self.period.nte_duration_us()) as usize)
                        / (self.every.nte_duration_us() as usize)
                },
                TimeUnit::Milliseconds
                    if gap > self.every.nte_duration_ms() + self.period.nte_duration_ms() =>
                {
                    ((gap - self.period.nte_duration_ms()) as usize)
                        / (self.every.nte_duration_ms() as usize)
                },
                _ => 1,
            };

            let nth: i64 = nth.try_into().unwrap();
            lower_bound = (self.add)(&(self.every * nth), lower_bound, self.tz.as_ref())?;
            upper_bound = (self.add)(&self.period, lower_bound, self.tz.as_ref())?;
        }

        if is_above_lower_bound(target, lower_bound, self.closed) {
            Ok(Ok((lower_bound, upper_bound)))
        } else {
            Ok(Err(lower_bound))
        }
    }

    fn start_lower_bound(&self, first: i64) -> PolarsResult<i64> {
        match self.start_by {
            StartBy::DataPoint => Ok(first),
            StartBy::WindowBound => {
                let get_earliest_bounds = match self.tu {
                    TimeUnit::Nanoseconds => Window::get_earliest_bounds_ns,
                    TimeUnit::Microseconds => Window::get_earliest_bounds_us,
                    TimeUnit::Milliseconds => Window::get_earliest_bounds_ms,
                };
                Ok((get_earliest_bounds)(
                    &Window::new(self.every, self.period, self.offset),
                    first,
                    self.closed,
                    self.tz.as_ref(),
                )?
                .start)
            },
            _ => {
                {
                    #[allow(clippy::type_complexity)]
                    let (from, to): (
                        fn(i64) -> NaiveDateTime,
                        fn(NaiveDateTime) -> i64,
                    ) = match self.tu {
                        TimeUnit::Nanoseconds => {
                            (timestamp_ns_to_datetime, datetime_to_timestamp_ns)
                        },
                        TimeUnit::Microseconds => {
                            (timestamp_us_to_datetime, datetime_to_timestamp_us)
                        },
                        TimeUnit::Milliseconds => {
                            (timestamp_ms_to_datetime, datetime_to_timestamp_ms)
                        },
                    };
                    // find beginning of the week.
                    let dt = from(first);
                    match self.tz.as_ref() {
                        #[cfg(feature = "timezones")]
                        Some(tz) => {
                            let dt = tz.from_utc_datetime(&dt);
                            let dt = dt.beginning_of_week();
                            let dt = dt.naive_utc();
                            let start = to(dt);
                            // adjust start of the week based on given day of the week
                            let start = (self.add)(
                                &Duration::parse(&format!("{}d", self.start_by.weekday().unwrap())),
                                start,
                                self.tz.as_ref(),
                            )?;
                            // apply the 'offset'
                            let start = (self.add)(&self.offset, start, self.tz.as_ref())?;
                            // make sure the first datapoint has a chance to be included
                            // and compute the end of the window defined by the 'period'
                            Ok(ensure_t_in_or_in_front_of_window(
                                self.every,
                                first,
                                self.add,
                                self.nte,
                                self.period,
                                start,
                                self.closed,
                                self.tz.as_ref(),
                            )?
                            .start)
                        },
                        _ => {
                            let tz = chrono::Utc;
                            let dt = dt.and_local_timezone(tz).unwrap();
                            let dt = dt.beginning_of_week();
                            let dt = dt.naive_utc();
                            let start = to(dt);
                            // adjust start of the week based on given day of the week
                            let start = (self.add)(
                                &Duration::parse(&format!("{}d", self.start_by.weekday().unwrap())),
                                start,
                                None,
                            )
                            .unwrap();
                            // apply the 'offset'
                            let start = (self.add)(&self.offset, start, None).unwrap();
                            // make sure the first datapoint has a chance to be included
                            // and compute the end of the window defined by the 'period'
                            Ok(ensure_t_in_or_in_front_of_window(
                                self.every,
                                first,
                                self.add,
                                self.nte,
                                self.period,
                                start,
                                self.closed,
                                None,
                            )?
                            .start)
                        },
                    }
                }
            },
        }
    }

    pub fn insert(
        &mut self,
        time: &[i64],
        windows: &mut Vec<[IdxSize; 2]>,
        lower_bound: &mut Vec<i64>,
        upper_bound: &mut Vec<i64>,
    ) -> PolarsResult<()> {
        if time.is_empty() {
            return Ok(());
        }

        if self.num_seen == 0 {
            debug_assert!(self.active.is_empty());
            self.next_lower_bound = self.start_lower_bound(time[0])?;
        }

        for &t in time {
            while let Some(w) = self.active.front()
                && !is_below_upper_bound(t, w.upper_bound, self.closed)
            {
                let w = self.active.pop_front().unwrap();
                windows.push([w.start, self.num_seen - w.start]);
                if self.include_lower_bound {
                    lower_bound.push(w.lower_bound);
                }
                if self.include_upper_bound {
                    upper_bound.push(w.upper_bound);
                }
            }

            while is_above_lower_bound(t, self.next_lower_bound, self.closed) {
                match self.find_first_window_around(self.next_lower_bound, t)? {
                    Ok((lower_bound, upper_bound)) => {
                        self.next_lower_bound =
                            (self.add)(&self.every, lower_bound, self.tz.as_ref())?;
                        self.active.push_back(ActiveDynWindow {
                            start: self.num_seen,
                            lower_bound,
                            upper_bound,
                        });
                    },
                    Err(lower_bound) => {
                        self.next_lower_bound = lower_bound;
                        break;
                    },
                }
            }

            self.num_seen += 1
        }

        Ok(())
    }

    pub fn lowest_needed_index(&self) -> IdxSize {
        self.active.front().map_or(self.num_seen, |w| w.start)
    }

    pub fn finalize(
        &mut self,
        windows: &mut Vec<[IdxSize; 2]>,
        lower_bound: &mut Vec<i64>,
        upper_bound: &mut Vec<i64>,
    ) {
        for w in self.active.drain(..) {
            windows.push([w.start, self.num_seen - w.start]);
            if self.include_lower_bound {
                lower_bound.push(w.lower_bound);
            }
            if self.include_upper_bound {
                upper_bound.push(w.upper_bound);
            }
        }

        self.next_lower_bound = 0;
        self.num_seen = 0;
    }

    pub fn num_seen(&self) -> IdxSize {
        self.num_seen
    }

    pub fn time_unit(&self) -> TimeUnit {
        self.tu
    }
}

#[cfg(test)]
mod test {
    use super::*;

    #[test]
    fn test_prune_duplicates() {
        //                     |--|------------|----|---------|
        //                     0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
        let time = &[0, 1, 1, 2, 2, 2, 3, 4, 5, 6, 5];
        let mut splits = vec![(0, 2), (2, 4), (6, 2), (8, 3)];
        prune_splits_on_duplicates(time, &mut splits);
        assert_eq!(splits, &[(0, 6), (6, 2), (8, 3)]);
    }
}
